package com.jokerku.mini.schedule.service;

import com.alibaba.fastjson.JSON;
import com.jokerku.mini.schedule.common.Constants;
import com.jokerku.mini.schedule.domain.ExecOrder;
import com.jokerku.mini.schedule.leader.LeaderService;
import com.jokerku.mini.schedule.loadbalance.LoadBalanceFactory;
import com.jokerku.mini.schedule.loadbalance.LoadBalanceStrategy;
import com.jokerku.mini.schedule.sharding.ShardingService;
import com.jokerku.mini.schedule.task.ScheduledTask;
import com.jokerku.mini.schedule.domain.ZkPath;
import com.jokerku.mini.schedule.util.SleepUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Author: guzq
 * @CreateTime: 2023/05/25 18:41
 * @Description: 心跳检测
 * 服务启动 1s 后,每隔 60s 检测一次
 * @Version: 1.0
 */
public class HeartbeatService {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatService.class);

    private static final HeartbeatService INSTANCE = new HeartbeatService();

    private ScheduledExecutorService scheduledExecutorService;

    private HeartbeatService() {
    }

    public static HeartbeatService getInstance() {
        return INSTANCE;
    }

    public void startRefreshSchedule() {
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

        scheduledExecutorService.scheduleAtFixedRate(this::refresh, 1, 1, TimeUnit.SECONDS);
    }

    private void refresh() {
        // 过期节点清除
        clearExpireNode();
        // 检测任务状态
        checkScheduleStatus();
    }

    private void clearExpireNode() {
        CuratorFramework client = Constants.Global.client;
        LeaderService leaderService = Constants.Global.leaderService;
        try {
            // 获取ip
            String pathRootServerIp = ZkPath.buildServerRunningIpPath(Constants.Global.scheduleServerId);
            List<String> ipList = client.getChildren().forPath(pathRootServerIp);
            boolean hasExpireNode = false;
            for (String ip : ipList) {
                // 清除已下线的节点
                String pathRootServerIpValue = ZkPath.buildServerRunningIpPathValue(Constants.Global.scheduleServerId, ip);
                Stat stat = client.checkExists().forPath(pathRootServerIpValue);
                if (null != stat) {
                    // 超过 15s 删除
                    if (System.currentTimeMillis() - stat.getMtime() > 5000 * 3 && leaderService.isLeader()) {
                        client.delete().deletingChildrenIfNeeded().forPath(pathRootServerIpValue);
                        hasExpireNode = true;
                        logger.info("节点 [{}] 过期清除", ip);
                    } else {
                        // 更新心跳
                        if (Constants.Global.ip.equals(ip)) {
                            client.setData().forPath(pathRootServerIpValue, ip.getBytes(Constants.Global.CHARSET));
                        }
                    }
                }
            }
            if (hasExpireNode) {
                ShardingService.getInstance().setReShardingFlag();
                ShardingService.getInstance().shardingIfNecessary();
            }
        } catch (Exception e) {
            logger.error("clear expire node error", e);
        }
    }

    public void assignTask() {
        CuratorFramework client = Constants.Global.client;
        LeaderService leaderService = getLeaderService(Constants.Global.leaderService);

        if (null == leaderService || !leaderService.isLeader()) {
            return;
        }

        try {
            // 获取ip
            String pathRootServerIp = ZkPath.buildServerRunningIpPath(Constants.Global.scheduleServerId);
            List<String> ipList = client.getChildren().forPath(pathRootServerIp);

            for (List<ExecOrder> execOrderList : Constants.EXEC_ORDER_MAP.values()) {
                for (ExecOrder execOrder : execOrderList) {
                    // 分配任务 taskId -> ip
                    String taskPath = ZkPath.getRunningTaskPath(execOrder.getTaskId());
                    if (null != client.checkExists().forPath(taskPath)) {
                        client.delete().deletingChildrenIfNeeded().forPath(taskPath);
                    }
                    // 负载均衡
                    String ip = LoadBalanceFactory.getLoadBalanceService(LoadBalanceStrategy.Poll).getDispatchIp(ipList);
                    client.create().creatingParentsIfNeeded().forPath(taskPath, ip.getBytes(StandardCharsets.UTF_8));

                    logger.info("任务: {} 分配成功, 执行 ip: {}", execOrder.getTaskId(), ip);
                }
            }
        } catch (Exception e) {
            logger.error("assign task error", e);
        }

    }

    private LeaderService getLeaderService(LeaderService leaderService) {
        int retryTimes = 0;
        while (null == leaderService) {
            if (retryTimes++ > 5) {
                return null;
            } else {
                SleepUtil.sleep();
            }
            leaderService = Constants.Global.leaderService;
        }

        return leaderService;
    }


    private void checkScheduleStatus() {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("mini schedule heart beat On-Site Inspection task");
            }

            Map<String, List<ExecOrder>> execOrderMap = Constants.EXEC_ORDER_MAP;
            Map<String, ScheduledTask> scheduledTaskMap = Constants.SCHEDULED_TASK_MAP;

            for (List<ExecOrder> execOrderList : execOrderMap.values()) {
                for (ExecOrder execOrder : execOrderList) {
                    // 判断当前任务是否正在执行
                    String taskId = execOrder.getTaskId();
                    ScheduledTask scheduledTask = scheduledTaskMap.get(taskId);
                    if (null == scheduledTask) {
                        continue;
                    }
                    // 路径拼装
                    String pathRootServerIpClassMethodValue = ZkPath.getServerTaskClassMethodValuePath(execOrder);
                    byte[] data = Constants.Global.client.getData().forPath(pathRootServerIpClassMethodValue);
                    ExecOrder oldExecOrder;
                    if (null != data) {
                        String oldJson = new String(data, Constants.Global.CHARSET);
                        oldExecOrder = JSON.parseObject(oldJson, ExecOrder.class);
                    } else {
                        // zk数据丢失则进行添加
                        oldExecOrder = new ExecOrder();
                        oldExecOrder.setBean(execOrder.getBean());
                        oldExecOrder.setBeanName(execOrder.getBeanName());
                        oldExecOrder.setMethodName(execOrder.getMethodName());
                        oldExecOrder.setCron(execOrder.getCron());
                        oldExecOrder.setAutoStartup(execOrder.getAutoStartup());
                        oldExecOrder.setDesc(execOrder.getDesc());
                    }
                    boolean canceled = scheduledTask.isCanceled();
                    oldExecOrder.setAutoStartup(!canceled);

                    // 临时节点
                    if (null == Constants.Global.client.checkExists().forPath(pathRootServerIpClassMethodValue)) {
                        continue;
                    }
                    // 更新任务json
                    Constants.Global.client.setData().forPath(pathRootServerIpClassMethodValue, JSON.toJSONString(oldExecOrder).getBytes(Constants.Global.CHARSET));

                    // 永久节点
                    String pathRootServerIpClassMethodStatus = ZkPath.getServerTaskClassMethodStatusPath(execOrder);
                    if (null == Constants.Global.client.checkExists().forPath(pathRootServerIpClassMethodStatus)) {
                        continue;
                    }
                    // 更新任务执行状态
                    Constants.Global.client.setData().forPath(pathRootServerIpClassMethodStatus, (oldExecOrder.getAutoStartup() ? "1" : "0").getBytes(Constants.Global.CHARSET));
                }
            }
        } catch (Exception ignore) {

        }
    }
}
